AWS IoT SiteWise ゲートウェイのデータを Amazon QuickSight で分析するための AWS IoT Analytics 環境を AWS CloudFormation で作成してみた

AWS IoT SiteWise ゲートウェイのデータを Amazon QuickSight で分析するための AWS IoT Analytics 環境を AWS CloudFormation で作成してみた

Clock Icon2022.07.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

以前、SiteWise ゲートウェイで収集した 設備機器の稼働データを IoT Analytics でデータ変換して QuickSight で利用する方法を紹介しました。

個人的にこの構成を頻繁に作る機会があったので、CloudFormation で作成できるようにしてみました。

前回のリソースとの変更点 〜 S3 バケットはユーザー管理〜

前回の記事では、IoT Analytics のリソースはシングルステップセットアップで作成していましたが、シングルステップセットアップでは、S3 バケットが AWS 管理のものになるためバケットの中身をユーザー直接見ることができません。

そのため今回のテンプレートでは、S3 バケットは全て「ユーザー管理のバケット」として作成することにしました。

作成される環境

紹介するテンプレートでは、下記の赤枠のリソースが作成されます。
IoT Analytics のチャンネル、データストア、パイプラインのそれぞれが別の S3 バケットとなります。

01-cfn-iot-analytics-diagram

CloudFormation テンプレート

テンプレートは下記のとおりです。

AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data).
###############################################################
# Parameters
###############################################################
Parameters:
  NamePrefix:
    Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$"
    Type: String
    AllowedPattern: ^[a-z0-9]*$
  UseLambda:
    Description: "Choose whether to use the Lambda function"
    Default: UseLambda
    Type: String
    AllowedValues:
      - UseLambda
      - NoUseLambda

###############################################################
# Conditions
###############################################################
Conditions:
  CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"]
  SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"]

###############################################################
# Resources
###############################################################
Resources:
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Channel
# ------------------------------------------------------------# 
  BucketForIoTAnalyticsChannel:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket
 
  # Bucket Policy for IoT Analytics Channel Bucket
  BucketPolicyForIoTAnalyticsChannnelBucket:
    Type: AWS::S3::BucketPolicy
    DeletionPolicy: Retain
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsChannel
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*"
                ]
              }
          ]
      }

# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data store
# ------------------------------------------------------------# 
  BucketForIoTAnalyticsDatastore:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket

  # Bucket Policy for IoT Analytics DataStore Bucket
  BucketPolicyForIoTAnalyticsDatastorelBucket:
    Type: AWS::S3::BucketPolicy
    DeletionPolicy: Retain
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsDatastore
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*"
                ]
              }
          ]
      }

# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data set
# ------------------------------------------------------------# 
  BucketForIoTAnalyticsDataset:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket
    DependsOn: IoTAnalyticsDatastore

  # Bucket Policy for IoT Analytics Dataset Bucket
  BucketPolicyForIoTAnalyticsDatasetBucket:
    Type: AWS::S3::BucketPolicy
    DeletionPolicy: Retain
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsDataset
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*"
                ]
              }
          ]
      }

# ------------------------------------------------------------#
# IAM Policy for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------# 
# IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsChannelToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties: 
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          - s3:GetObject
          - s3:ListBucket
          - s3:GetBucketLocation
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy

# IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsDatastoreToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          - s3:DeleteObject
          - s3:GetBucketLocation
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy

# IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsDatasetToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy

# ------------------------------------------------------------#
# IAM Role for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------# 
# IAM Role (IoT Analytics Channel -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsChannelToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsChannelToBucket

# IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsDatastoreToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsDatastoreToBucket

# IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsDatasetToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsDatasetToBucket

# ------------------------------------------------------------#
# AWS IoT Analytics
# ------------------------------------------------------------# 
  IoTAnalyticsChannel:
    Type: AWS::IoTAnalytics::Channel
    Properties:
      ChannelName: !Sub ${NamePrefix}_channel
      ChannelStorage:
        CustomerManagedS3:
          Bucket: !Ref BucketForIoTAnalyticsChannel
          #KeyPrefix: String
          RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn
    DependsOn: BucketForIoTAnalyticsChannel

  IoTAnalyticsDatastore:
    Type: AWS::IoTAnalytics::Datastore
    Properties:
      DatastoreName: !Sub ${NamePrefix}_datastore
      DatastoreStorage:
          CustomerManagedS3:
            Bucket: !Ref BucketForIoTAnalyticsDatastore
            #KeyPrefix: String
            RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn
    DependsOn: BucketForIoTAnalyticsDatastore

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Condition: CreateLambdaResources
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      ManagedPolicyArns:
        - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'

  LambdaFunction:
    Type: AWS::Lambda::Function
    Condition: CreateLambdaResources
    Properties:
      FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.9
      Code:
        ZipFile: |
          import json
          import logging
          import sys
          import time
          from datetime import datetime, timezone, timedelta

          # Configure logging
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          streamHandler = logging.StreamHandler(stream=sys.stdout)
          formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
          streamHandler.setFormatter(formatter)
          logger.addHandler(streamHandler)

          # timezone
          JST = timezone(timedelta(hours=+9), 'JST')

          def lambda_handler(event, context):
              logger.info("event: {}".format(event))
              transformed = []
              
              for e in event:
                  # e に単一アセットのデータが全部入る
                  logger.info("e: {}".format(json.dumps(e, indent=2)))
                  print(type(e)) #<class 'dict'>
                  property_alias = e['propertyAlias']

                  propertyValuesList = e['propertyValues']
                  property_value = propertyValuesList[0]

                  value = ""
                  if 'doubleValue' in property_value['value']:
                      value = property_value['value']['doubleValue']
                      logger.info("value_type: doubleValue")
                  if 'integerValue' in property_value['value']:
                      value = property_value['value']['integerValue']
                      logger.info("value_type: integerValue")
                  if 'booleanValue' in property_value['value']:
                      value = property_value['value']['booleanValue']
                      logger.info("value_type: booleanValue")
                  if 'stringValue' in property_value['value']:
                      value = property_value['value']['stringValue']
                      logger.info("value_type: stringValue")

                  quality = ""
                  if 'quality' in property_value:
                      quality = property_value['quality']
                      logger.debug("quality in payload")
                  
                  timestamp = ""
                  unixtime = ""
                  nanoseconds = ""
                  devicetime = ""
                  if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']:
                      inttime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157
                      print("milliseconds: " + str(milliseconds))
                      unixtime = inttime + milliseconds # 1655307221.157
                      print("unixtime: " + str(unixtime))
                      dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
                      timestamp = dt.isoformat(timespec='milliseconds')
                      print("timestamp: " + str(timestamp))

                  if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']:
                      unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
                      timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC)
                      
                      unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
                      dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
                      timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00

                  row = {}
                  row['propertyAlias'] = property_alias
                  row['value'] = value
                  row['timestamp'] = timestamp
                  row['quality'] = quality
                  logger.debug("row: {}".format(row))
                  transformed.append(row)

              logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2)))
              return transformed # return は[]リストで返す

  LambdaPermission:
    Type: AWS::Lambda::Permission
    Condition: CreateLambdaResources
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt LambdaFunction.Arn
      Principal: iotanalytics.amazonaws.com

  IoTAnalyticsPipelineLambdaTrue:
    Type: AWS::IoTAnalytics::Pipeline
    Condition: CreateLambdaResources
    Properties:
      PipelineName: !Sub ${NamePrefix}_pipeline
      PipelineActivities:
        - Channel:
            # データの取得元を指定する
            Name: pipeline-channel-activity
            ChannelName: !Sub ${NamePrefix}_channel
            Next: pipeline-lambda-activity
          Lambda:
            # LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換
            Name: pipeline-lambda-activity
            BatchSize: 1
            LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function
            Next: pipeline-datastore-activity
          Datastore:
            # データの保存先指定する
            Name: pipeline-datastore-activity
            DatastoreName: !Sub ${NamePrefix}_datastore

  IoTAnalyticsPipelineLambdaFalse:
    Type: AWS::IoTAnalytics::Pipeline
    Condition: SkipLambdaResources
    Properties:
      PipelineName: !Sub ${NamePrefix}_pipeline
      PipelineActivities:
        - Channel:
            # データの取得元を指定する
            Name: pipeline-channel-activity
            ChannelName: !Sub ${NamePrefix}_channel
            Next: pipeline-datastore-activity
          Datastore:
            # データの保存先指定する
            Name: pipeline-datastore-activity
            DatastoreName: !Sub ${NamePrefix}_datastore

  IoTAnalyticsDataset:
    Type: AWS::IoTAnalytics::Dataset
    Properties:
      DatasetName: !Sub ${NamePrefix}_dataset
      Actions:
        - ActionName: SqlAction
          QueryAction:
            # データの取得
            SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day", !Sub "SELECT * FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day" ]
      RetentionPeriod:
        # データセットの保持日数
        NumberOfDays: 1
        Unlimited: false
      Triggers:
        # データセットを自動更新する
        - Schedule:
            ScheduleExpression: rate(5 minute)
      # データセットをユーザー管理のS3に保存
      ContentDeliveryRules:
        - Destination:
            S3DestinationConfiguration:
              Bucket: !Ref BucketForIoTAnalyticsDataset
              Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv'
              RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn
    DependsOn: IoTAnalyticsDatastore

Lambda 関数の作成有無を指定

冒頭でお伝えしたとおり、このテンプレートは SiteWise ゲートウェイからの TQV 形式のデータを QuickSight で分析しやすい形にする環境を構築するものです。
そのためデプロイされる Lambda 関数では TQV データを適切な形に変換する処理を行います。

しかし、場合によっては IoT Analytics の環境だけ欲しい( Lambda 関数は不要、もしくは自分のやりたい処理に合わせた関数を作りたい)といったケースもあると思います。 そのため本テンプレートでは、パラメーターで Lambda 関数の作成有無を入力することで、Lambda 関数の作成を選択できるようにしています。

もし Lambda 関数を作成しない場合、IoT Analytics のSQL データセットに指定される SQL 文は下記のようになるので、スタック作成後に必要に応じて適宜変更してください。
${NamePrefix} には CloudFormation のスタック作成時に指定したものが入ります。)

SELECT * FROM ${NamePrefix}_datastore

スタックの作成

スタックの作成時は「リソースを区別するためのプレフィックス」「Lambda 関数の作成有無」を指定します。

02-cloudformation-console

UseLambda のパラメーターの選択の違いは下記のとおりです。

  • UseLambda : Lambda 関数を作成する
  • NoUseLambda: Lambda 関数を作成しない

(値にパラメーターと同じ UseLambda を使ってしまっていますが…)

また、NamePrefix では英小文字と数字だけ利用できるようにしています。
これは、S3 のバケット名と IoT Analytics の各種リソース名の命名規則によるものです。IoT Analytics のリソース名ではアンダースコアを利用できる一方、S3 バケット名ではアンダースコアが使えません。
もう少し柔軟に設定できるようにしても良かったのですが、テンプレートが更に長大化するので個人利用する分には不要と判断しました。

スタックの削除

スタックを削除する際ですが、収集したデータを別の用途に使いたいこともあるかもしれないので、スタックを削除しても S3 バケットは削除しないようにしています。

AWS マネージドな S3 バケットを使う場合

ユーザーマネージドではなく AWS マネージドの S3 バケットを使う場合は下記のテンプレートを参考にしていただければと思います。

スタック削除時に S3 バケットも削除するテンプレート(2022年7月8日追記)

上記のものを改修して、スタック削除時に S3 バケットも削除するテンプレートも作成してみました。
収集したデータも合わせて全て削除したい場合にご利用ください。

バケット内のオブジェクトを削除するためにカスタムリソースを作っていますが、この Lambda 関数のタイムアウトを最大の 900 秒にしています。膨大な量のデータがある場合はタイムアウトする可能性があるので、S3 のライフサイクルポリシーなどを活用して削除するようにしてください。

---
AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data). And When deleting the stack, the S3 buckets are also deleted.
###############################################################
# Parameters
###############################################################
Parameters:
  NamePrefix:
    Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$"
    Type: String
    AllowedPattern: ^[a-z0-9]*$
  UseLambda:
    Description: "Choose whether to use the Lambda function"
    Default: UseLambda
    Type: String
    AllowedValues:
      - UseLambda
      - NoUseLambda

###############################################################
# Conditions
###############################################################
Conditions:
  CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"]
  SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"]

###############################################################
# Resources
###############################################################
Resources:
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Channel
# ------------------------------------------------------------#
  BucketForIoTAnalyticsChannel:
    Type: AWS::S3::Bucket
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket
 
  # Bucket Policy for IoT Analytics Channel Bucket
  BucketPolicyForIoTAnalyticsChannnelBucket:
    Type: AWS::S3::BucketPolicy
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsChannel
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*"
                ]
              }
          ]
      }

# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data store
# ------------------------------------------------------------#
  BucketForIoTAnalyticsDatastore:
    Type: AWS::S3::Bucket
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket

  # Bucket Policy for IoT Analytics DataStore Bucket
  BucketPolicyForIoTAnalyticsDatastorelBucket:
    Type: AWS::S3::BucketPolicy
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsDatastore
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*"
                ]
              }
          ]
      }

# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data set
# ------------------------------------------------------------# 
  BucketForIoTAnalyticsDataset:
    Type: AWS::S3::Bucket
    Properties: 
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket
    DependsOn: IoTAnalyticsDatastore

  # Bucket Policy for IoT Analytics Dataset Bucket
  BucketPolicyForIoTAnalyticsDatasetBucket:
    Type: AWS::S3::BucketPolicy
    Properties: 
      Bucket: !Ref BucketForIoTAnalyticsDataset
      PolicyDocument: { "Version": "2012-10-17",
        "Id": "MyPolicyID",
        "Statement": [
            {
                "Sid": "MyStatementSid",
                "Effect": "Allow",
                "Principal": {
                    "Service": "iotanalytics.amazonaws.com"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}",
                    "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*"
                ]
              }
          ]
      }


# ------------------------------------------------------------#
# IAM Policy for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------# 
# IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsChannelToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties: 
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          - s3:GetObject
          - s3:ListBucket
          - s3:GetBucketLocation
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy

# IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsDatastoreToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          - s3:DeleteObject
          - s3:GetBucketLocation
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy

# IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket) 
  PolicyForIoTAnalyticsDatasetToBucket:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:PutObject
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*
      ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy

# ------------------------------------------------------------#
# IAM Role for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------# 
# IAM Role (IoT Analytics Channel -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsChannelToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsChannelToBucket

# IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsDatastoreToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsDatastoreToBucket

# IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket) 
  RoleForIoTAnalyticsDatasetToBucket:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "iotanalytics.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role
      ManagedPolicyArns:
        - !Ref PolicyForIoTAnalyticsDatasetToBucket

# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------# 
# Lambda for IoT Analytics activity 
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Condition: CreateLambdaResources
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      ManagedPolicyArns:
        - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'

  LambdaFunctionIoTAnalyticsActivity:
    Type: AWS::Lambda::Function
    Condition: CreateLambdaResources
    Properties:
      FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.9
      Code:
        ZipFile: |
          import json
          import logging
          import sys
          import time
          from datetime import datetime, timezone, timedelta

          # Configure logging
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          streamHandler = logging.StreamHandler(stream=sys.stdout)
          formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
          streamHandler.setFormatter(formatter)
          logger.addHandler(streamHandler)

          # timezone
          JST = timezone(timedelta(hours=+9), 'JST')

          def lambda_handler(event, context):
              logger.info("event: {}".format(event))
              transformed = []
              
              for e in event:
              #for e, value in event.items():
                  # e に単一アセットのデータが全部入る
                  logger.info("e: {}".format(json.dumps(e, indent=2)))
                  print(type(e)) #<class 'dict'>
                  property_alias = e['propertyAlias']

                  propertyValuesList = e['propertyValues']
                  property_value = propertyValuesList[0]

                  value = ""
                  if 'doubleValue' in property_value['value']:
                      value = property_value['value']['doubleValue']
                      logger.info("value_type: doubleValue")
                  if 'integerValue' in property_value['value']:
                      value = property_value['value']['integerValue']
                      logger.info("value_type: integerValue")
                  if 'booleanValue' in property_value['value']:
                      value = property_value['value']['booleanValue']
                      logger.info("value_type: booleanValue")
                  if 'stringValue' in property_value['value']:
                      value = property_value['value']['stringValue']
                      logger.info("value_type: stringValue")

                  quality = ""
                  if 'quality' in property_value:
                      quality = property_value['quality']
                      logger.debug("quality in payload")
                  
                  timestamp = ""
                  unixtime = ""
                  nanoseconds = ""
                  devicetime = ""
                  if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']:
                      inttime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157
                      print("milliseconds: " + str(milliseconds))
                      unixtime = inttime + milliseconds # 1655307221.157
                      print("unixtime: " + str(unixtime))
                      dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
                      timestamp = dt.isoformat(timespec='milliseconds')
                      print("timestamp: " + str(timestamp))
                      #print(timestamp) # 2022-06-16T09:33:41.157+09:00

                  if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']:
                      unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
                      timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC)
                      
                      unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
                      devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
                      dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
                      timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00

                  row = {}
                  row['propertyAlias'] = property_alias
                  row['value'] = value
                  row['timestamp'] = timestamp
                  row['quality'] = quality
                  logger.debug("row: {}".format(row))
                  transformed.append(row)

              logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2)))
              return transformed # return は[]リストで返す


  LambdaPermission:
    Type: AWS::Lambda::Permission
    Condition: CreateLambdaResources
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt LambdaFunctionIoTAnalyticsActivity.Arn
      Principal: iotanalytics.amazonaws.com


###############################################################
# Custom Resources
###############################################################
# ------------------------------------------------------------#
# IAM Policy for CloudFormation custom resource Lambda 
# ------------------------------------------------------------# 
  PolicyForCustomeResourceDeleteS3Object:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Effect: Allow
          Action:
          - s3:DeleteObject
          - s3:List*
          Resource:
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}
          - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*

# ------------------------------------------------------------#
# Custom Resources
# ------------------------------------------------------------# 
  # カスタムリソース追加
  # Delete Bucket for IoT Analytics Channel
  LambdaUsedToDeleteBucketForIoTAnalyticsChannel:
     Type: Custom::cleanupbucket
     Properties:
       ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
       BucketName: !Ref BucketForIoTAnalyticsChannel
  # Delete Bucket for IoT Analytics Datastore
  LambdaUsedToDeleteBucketForIoTAnalyticsDatastore:
     Type: Custom::cleanupbucket
     Properties:
       ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
       BucketName: !Ref BucketForIoTAnalyticsDatastore
  # Delete Bucket for IoT Analytics Dataset
  LambdaUsedToDeleteBucketForIoTAnalyticsDataset:
     Type: Custom::cleanupbucket
     Properties:
       ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
       BucketName: !Ref BucketForIoTAnalyticsDataset

# ------------------------------------------------------------#
# Custom resource Lambda
# ------------------------------------------------------------# 
# Lambda for S3 bucket delete custom resource
  # Lambda Role
  LambdaExecutionRoleCustomResource:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      ManagedPolicyArns:
        - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        - !Ref PolicyForCustomeResourceDeleteS3Object

  # Lambda Function
  LambdaFunctionS3BucketDelete:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${NamePrefix}-delete-s3
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRoleCustomResource.Arn
      Runtime: python3.9
      Timeout: 900
      Code:
        ZipFile: |
          import json
          import boto3
          import cfnresponse

          def lambda_handler(event, context):
              try:
                  bucket = event['ResourceProperties']['BucketName']

                  if event['RequestType'] == 'Delete':
                      s3 = boto3.resource('s3')
                      bucket = s3.Bucket(bucket)
                      for obj in bucket.objects.filter():
                          s3.Object(bucket.name, obj.key).delete()
                          #bucket.object_versions.delete() # 必要に応じて

                  cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Response': 'Success'})

              except Exception as e:
                  print(e)
                  cfnresponse.send(event, context, cfnresponse.FAILED, {'Response': 'Failed'})


# ------------------------------------------------------------#
# AWS IoT Analytics
# ------------------------------------------------------------# 
  # IoT Analytics Channel
  IoTAnalyticsChannel:
    Type: AWS::IoTAnalytics::Channel
    Properties:
      ChannelName: !Sub ${NamePrefix}_channel
      ChannelStorage:
        CustomerManagedS3:
          Bucket: !Ref BucketForIoTAnalyticsChannel
          RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn
    DependsOn: BucketForIoTAnalyticsChannel

  # IoT Analytics Datastore
  IoTAnalyticsDatastore:
    Type: AWS::IoTAnalytics::Datastore
    Properties:
      DatastoreName: !Sub ${NamePrefix}_datastore
      DatastoreStorage:
          CustomerManagedS3:
            Bucket: !Ref BucketForIoTAnalyticsDatastore
            RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn
    DependsOn: BucketForIoTAnalyticsDatastore

  # IoT Analytics Pipeline (No Lambda Activity)
  IoTAnalyticsPipelineLambdaFalse:
    Type: AWS::IoTAnalytics::Pipeline
    Condition: SkipLambdaResources
    Properties:
      PipelineName: !Sub ${NamePrefix}_pipeline
      PipelineActivities:
        - Channel:
            # データの取得元を指定する
            Name: pipeline-channel-activity
            ChannelName: !Sub ${NamePrefix}_channel
            Next: pipeline-datastore-activity
          Datastore:
            # データの保存先指定する
            Name: pipeline-datastore-activity
            DatastoreName: !Sub ${NamePrefix}_datastore

  # IoT Analytics Pipeline (with Lambda Activity)
  IoTAnalyticsPipelineLambdaTrue:
    Type: AWS::IoTAnalytics::Pipeline
    Condition: CreateLambdaResources
    Properties:
      PipelineName: !Sub ${NamePrefix}_pipeline
      PipelineActivities:
        - Channel:
            # データの取得元を指定する
            Name: pipeline-channel-activity
            ChannelName: !Sub ${NamePrefix}_channel
            Next: pipeline-lambda-activity
          Lambda:
            # LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換
            Name: pipeline-lambda-activity
            BatchSize: 1
            LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function
            Next: pipeline-datastore-activity
          Datastore:
            # データの保存先指定する
            Name: pipeline-datastore-activity
            DatastoreName: !Sub ${NamePrefix}_datastore

  IoTAnalyticsDataset:
    Type: AWS::IoTAnalytics::Dataset
    Properties:
      DatasetName: !Sub ${NamePrefix}_dataset
      Actions:
        - ActionName: SqlAction
          QueryAction:
            # 当日分のデータのみ取得する
            #SqlQuery: !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore"
            SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore", !Sub "SELECT * FROM ${NamePrefix}_datastore" ]
      RetentionPeriod:
        # データセットの保持日数
        NumberOfDays: 1
        Unlimited: false
      Triggers:
        # データセットを自動更新する
        - Schedule:
            ScheduleExpression: rate(5 minute)
      # データセットをユーザー管理のS3に保存
      ContentDeliveryRules:
        - Destination:
            S3DestinationConfiguration:
              Bucket: !Ref BucketForIoTAnalyticsDataset
              Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv'
              RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn
        #EntryName: String
    DependsOn: IoTAnalyticsDatastore

最後に

シングルステップセットアップで IoT Analytics 環境を簡単に作成することはできますが「データの中身を直接見る」ということができなかったので、これでいつでも欲しい環境がサクッと準備できるようになりました。

IoT Analytics を利用される際に参考にしていただければ幸いです。

以上です。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.